package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
import com.data.dev.utils.HttpUtils;
import com.data.dev.utils.IPUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
 *  滑动窗口内复杂事件解析逻辑实现
 *  @author wangxiaoming-ghq 2022-06-01
 */
@Slf4j
public   class WindowProcessFuncImpl extends  ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow> implements Serializable {
    @Override
    public void process(String s, ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow>.Context context, Iterable<MailMsg> iterable, Collector<MailMsgAlarm> collector) {

        List<MailMsg> loginEventList = new ArrayList<>();
        MailMsgAlarm mailMsgAlarm;
        for (MailMsg mailMsg : iterable) {

            log.info("收集到的登录事件【" + mailMsg + "】");

            if (mailMsg.getResult().equals("fail")) { //开始检测当前窗口内的事件，并将失败的事件收集到loginEventList
                log.info("开始检测当前窗口内的事件，并将失败的事件收集到loginEventList");
                loginEventList.add(mailMsg);
            } else if (mailMsg.getResult().equals("success") && loginEventList.size() < 20) {//如果检测到登录成功事件，但此时登录失败的次数不足20次，则清空loginEventList，等待下一次检测
                log.info("检测到登录成功事件，但此时登录失败的次数为【" + loginEventList.size() + "】不足20次，清空loginEventList，等待下一次检测");
                loginEventList.clear();
            } else if (mailMsg.getResult().equals("success") && loginEventList.size() >= 20) {
                mailMsgAlarm = getMailMsgAlarm(loginEventList,mailMsg);
                log.info("检测到登录成功的事件,此时窗口内连续登录失败的次数为【" + mailMsgAlarm.getFailTimes() + "】");

                //一旦符合窗口内连续登录失败超过20次且下一次登录成功这个事件，则清空此时的loginEventList并将当前登录成功的事件进行告警推送；
                loginEventList.clear();
                doAlarmPush(mailMsgAlarm);

                collector.collect(mailMsgAlarm);//将当前登录成功的事件进行收集上报
            } else {
                log.info(mailMsg.getUser() + "当前已连续：【" + loginEventList.size() + "】 次登录失败");
            }
        }
    }


    /**
     * 2022年6月17日15:03:06
     * @param eventList:当前窗口内的事件列表
     * @param eventCurrent：当前登录成功的事件
     * @return mailMsgAlarm：告警消息体
     */
    public static MailMsgAlarm getMailMsgAlarm(List<MailMsg> eventList,MailMsg eventCurrent){

        String alarmKey = eventCurrent.getUser() + "@" + eventCurrent.getClient_ip();
        String loginFailStartTime = eventList.get(0).getTimestamp_datetime();
        String loginSuccessTime = eventCurrent.getTimestamp_datetime();
        int loginFailTimes = eventList.size();

        MailMsgAlarm mailMsgAlarm = new MailMsgAlarm();
        mailMsgAlarm.setMailMsg(eventCurrent);
        mailMsgAlarm.setAlarmKey(alarmKey);
        mailMsgAlarm.setStartTime(loginFailStartTime);
        mailMsgAlarm.setEndTime(loginSuccessTime);
        mailMsgAlarm.setFailTimes(loginFailTimes);

        return mailMsgAlarm;
    }

    /**
     * 2022年6月17日14:47:53
     * @param mailMsgAlarm :当前构建的需要告警的事件
     */
    public void doAlarmPush(MailMsgAlarm mailMsgAlarm){
        String userKey = mailMsgAlarm.getAlarmKey();
        String clientIp = mailMsgAlarm.mailMsg.getClient_ip();
        boolean isWhiteListIp = IPUtils.isWhiteListIp(clientIp);
        if(isWhiteListIp){//如果是白名单IP，不告警
            log.info("当前登录用户【" + userKey + "】属于白名单IP");
        }else {
            //IP归属查询结果、企业微信推送告警
            String user = HttpUtils.getUserByClientIp(clientIp);
            HttpUtils.pushAlarmMsgToWechatWork(user,mailMsgAlarm.toString());
        }
    }
}